import json
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer


class KafkaProducer:
    def __init__(self):
        self.producer = AIOKafkaProducer(bootstrap_servers="localhost:9092")
        self._started = False

    async def start(self):
        if not self._started:
            await self.producer.start()
            self._started = True

    async def send(self, topic, message):
        if not self._started:
            raise RuntimeError("Kafka producer not started")
        await self.producer.send_and_wait(topic, json.dumps(message).encode("utf-8"))

    async def stop(self):
        if self._started:
            await self.producer.stop()
            self._started = False



class KafkaConsumer:
    def __init__(self, topic, servers="localhost:9092"):
        self.topic = topic
        self.consumer = AIOKafkaConsumer(
            topic,
            bootstrap_servers=servers,
            group_id="qq_bot_group"
        )

    async def consume(self):
        await self.consumer.start()
        try:
            async for msg in self.consumer:
                yield json.loads(msg.value)
        finally:
            await self.consumer.stop()
